/*
 * SPDX-License-Identifier: Apache-2.0
 * Copyright 2018-2019 The Feast Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package feast.core.model;

import static feast.core.util.StreamUtil.wrapException;

import com.google.protobuf.InvalidProtocolBufferException;
import feast.core.job.Runner;
import feast.proto.core.FeatureSetProto;
import feast.proto.core.IngestionJobProto;
import feast.proto.core.SourceProto.SourceType;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import javax.persistence.*;
import javax.persistence.Entity;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;

/** Contains information about a run job. */
@Getter
@Setter
@Entity
@Table(name = "jobs")
@Builder(setterPrefix = "set", toBuilder = true)
@AllArgsConstructor(access = AccessLevel.PRIVATE)
public class Job extends AbstractTimestampEntity {
  public static class JobBuilder {
    public JobBuilder setSource(Source source) {
      // map source fields into equavilent inline job fields.
      this.sourceType = source.getType();
      this.sourceConfig = source.getConfig();
      return this;
    }
  }

  // Internal job name. Generated by feast ingestion upon invocation.
  @Id private String id;

  // External job id, generated by the runner and retrieved by feast.
  // Used internally for job management.
  @Column(name = "ext_id")
  private String extId;

  // Runner type
  @Enumerated(EnumType.STRING)
  @Column(name = "runner")
  private Runner runner;

  // Source type and config, derieved from job's source and stored as inline fields.
  @Enumerated(EnumType.STRING)
  @Column(name = "source_type")
  private SourceType sourceType;

  @Column(name = "source_config")
  private String sourceConfig;

  // Sinks
  @ManyToMany
  @JoinTable(
      name = "jobs_stores",
      joinColumns = @JoinColumn(name = "job_id"),
      inverseJoinColumns = @JoinColumn(name = "store_name"),
      indexes = {
        @Index(name = "idx_jobs_stores_job_id", columnList = "job_id"),
        @Index(name = "idx_jobs_stores_store_name", columnList = "store_name")
      })
  private Set<Store> stores;

  @Deprecated
  @Column(name = "store_name")
  String storeName;

  // FeatureSets populated by the job via intermediate FeatureSetJobStatus model
  @OneToMany(mappedBy = "job", cascade = CascadeType.ALL)
  private Set<FeatureSetJobStatus> featureSetJobStatuses = new HashSet<>();

  @Enumerated(EnumType.STRING)
  @Column(name = "status", length = 16)
  private JobStatus status;

  public Job() {
    super();
  }

  public boolean hasTerminated() {
    return getStatus().isTerminal();
  }

  public boolean isRunning() {
    return getStatus() == JobStatus.RUNNING;
  }

  public boolean isDeployed() {
    return getExtId() != null && !getExtId().isEmpty();
  }

  public Set<FeatureSet> getFeatureSets() {
    return this.featureSetJobStatuses.stream()
        .map(FeatureSetJobStatus::getFeatureSet)
        .collect(Collectors.toSet());
  }

  public Source getSource() {
    Source source = new Source();
    source.setType(this.sourceType);
    source.setConfig(this.sourceConfig);
    return source;
  }

  public void addAllFeatureSets(Set<FeatureSet> featureSets) {
    for (FeatureSet fs : featureSets) {
      FeatureSetJobStatus status = new FeatureSetJobStatus();
      status.setFeatureSet(fs);
      status.setJob(this);
      if (fs.getStatus() == FeatureSetProto.FeatureSetStatus.STATUS_READY) {
        // Feature Set was already delivered to previous generation of the job
        // (another words, it exists in kafka)
        // so we expect Job will ack latest version based on history from kafka topic
        status.setVersion(fs.getVersion());
      }
      status.setDeliveryStatus(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS);
      this.getFeatureSetJobStatuses().add(status);
    }
  }

  /**
   * Convert a job model to ingestion job proto
   *
   * @return Ingestion Job proto derieved from the given job
   */
  public IngestionJobProto.IngestionJob toProto() throws InvalidProtocolBufferException {

    // convert featuresets of job to protos
    List<FeatureSetProto.FeatureSet> featureSetProtos = new ArrayList<>();
    for (FeatureSetJobStatus featureSet : this.getFeatureSetJobStatuses()) {
      featureSetProtos.add(featureSet.getFeatureSet().toProto());
    }

    // build ingestion job proto with job data
    IngestionJobProto.IngestionJob ingestJob =
        IngestionJobProto.IngestionJob.newBuilder()
            .setId(this.getId())
            .setExternalId(this.getExtId())
            .setStatus(this.getStatus().toProto())
            .setSource(this.getSource().toProto())
            .addAllStores(
                this.getStores().stream()
                    .map(wrapException(Store::toProto))
                    .collect(Collectors.toSet()))
            .addAllFeatureSets(featureSetProtos)
            .build();

    return ingestJob;
  }

  public Job clone() {
    Job job =
        Job.builder()
            .setStores(getStores())
            .setStoreName(getStoreName())
            .setSourceConfig(getSourceConfig())
            .setSourceType(getSourceType())
            .setFeatureSetJobStatuses(new HashSet<>())
            .setRunner(getRunner())
            .setStatus(JobStatus.UNKNOWN)
            .build();
    job.addAllFeatureSets(getFeatureSets());
    return job;
  }

  @Override
  public int hashCode() {
    return Objects.hash(getSource(), this.stores, this.runner);
  }

  @Override
  public boolean equals(Object obj) {
    if (this == obj) return true;
    if (!super.equals(obj)) return false;
    if (getClass() != obj.getClass()) return false;
    Job other = (Job) obj;
    if (!runner.equals(other.runner)) {
      return false;
    } else if (!getSource().equals(other.getSource())) {
      return false;
    } else if (!stores.equals(other.stores)) {
      return false;
    }
    return true;
  }
}
